package com.hbase.dao;

import com.hbase.dao.constant.HBaseConstant;
import com.hbase.dao.entity.HBaseQueryEntity;
import com.hbase.dao.entity.HBaseQueryResultEntity;
import com.hbase.dao.entity.HBaseRecordEntity;
import com.hbase.dao.exception.HBaseDaoException;
import com.hbase.dao.pool.HTablePoolEngine;
import com.hbase.dao.resolve.HBaseEntityResolve;
import com.hbase.dao.resolve.HBaseTableEntityAnnotation;
import com.hbase.dao.resolve.HBaseTableEntityAnnotationManage;
import com.hbase.dao.util.CheckUtil;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.RowLock;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.filter.KeyOnlyFilter;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;

import com.google.common.collect.Lists;

/**
 * HBaseDao方法实现
 * 
 * @author yangshenhui
 * @version 1.0
 */
@SuppressWarnings("deprecation")
public class HBaseDaoImpl extends HBaseBaseDao {
	private static Logger logger = Logger.getLogger(HBaseDaoImpl.class);
	/**
	 * 此次操作HBase的标识,通过标识可以获得创建的Configuration、HTablePool、HTable
	 */
	private String hBaseInstanceName;

	public HBaseDaoImpl() {
		this(HBaseConstant.HBASE_DEFAULT_INSTANCE);
	}

	public HBaseDaoImpl(String hBaseInstanceName) {
		this.hBaseInstanceName = hBaseInstanceName;
	}

	@Override
	public void put(String tableName, HBaseRecordEntity hBaseRecordEntity)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkNull(hBaseRecordEntity);
		List<HBaseRecordEntity> hBaseRecordEntityList = Lists.newArrayList();
		hBaseRecordEntityList.add(hBaseRecordEntity);
		put(tableName, hBaseRecordEntityList);
	}

	@Override
	public void put(HTableInterface hTableInterface,
			List<HBaseRecordEntity> hBaseRecordEntityList, boolean isWriteToWAL)
			throws HBaseDaoException {
		CheckUtil.checkNull(hTableInterface);
		CheckUtil.checkEmptyCollection(hBaseRecordEntityList);
		try {
			List<Put> putList = Lists.newArrayList();
			for (HBaseRecordEntity hBaseRecordEntity : hBaseRecordEntityList) {
				Put put = createPut(hBaseRecordEntity, null);
				if (CheckUtil.isNull(put)) {
					logger.error(String.format("错误的数据行%s.", hBaseRecordEntity));
					continue;
				}
				put.setWriteToWAL(isWriteToWAL);
				putList.add(put);
			}
			submitPut(hTableInterface, putList);
			logger.info(String.format("插入%s条数据到HBase表[%s].",
					hBaseRecordEntityList.size(),
					Bytes.toString(hTableInterface.getTableName())));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		}
	}

	@Override
	public void put(String tableName,
			List<HBaseRecordEntity> hBaseRecordEntityList, boolean isWriteToWAL)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkEmptyCollection(hBaseRecordEntityList);
		HTableInterface hTableInterface = null;
		try {
			List<Put> putList = Lists.newArrayList();
			hTableInterface = HTablePoolEngine.getHTable(tableName,
					hBaseInstanceName);
			for (HBaseRecordEntity hBaseRecordEntity : hBaseRecordEntityList) {
				Put put = createPut(hBaseRecordEntity, null);
				if (CheckUtil.isNull(put)) {
					logger.error(String.format("错误的数据行%s.", hBaseRecordEntity));
					continue;
				}
				put.setWriteToWAL(isWriteToWAL);
				putList.add(put);
			}
			commitPut(hTableInterface, putList);
			logger.info(String.format("插入%s条数据到HBase表[%s].",
					hBaseRecordEntityList.size(), tableName));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			HTablePoolEngine.closeHTable(hTableInterface);
		}
	}

	@Override
	public boolean insert(String tableName, HBaseRecordEntity hBaseRecordEntity)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkNull(hBaseRecordEntity);
		return merge(tableName, hBaseRecordEntity, true);
	}

	@Override
	public boolean update(String tableName, HBaseRecordEntity hBaseRecordEntity)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkNull(hBaseRecordEntity);
		return merge(tableName, hBaseRecordEntity, false);
	}

	@Override
	protected boolean merge(String tableName,
			HBaseRecordEntity hBaseRecordEntity, boolean ifInsertElseUpdate)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkNull(hBaseRecordEntity);
		HTableInterface hTableInterface = null;
		RowLock rowLock = null;
		try {
			hTableInterface = HTablePoolEngine.getHTable(tableName,
					hBaseInstanceName);
			byte[] rowKey = Bytes.toBytes(hBaseRecordEntity.getRowKey());
			rowLock = lockRow(hTableInterface, rowKey);
			Get get = new Get(rowKey, rowLock);
			addKeyAndFirstKeyOnlyFilter(get);
			Result result = hTableInterface.get(get);
			if (result.isEmpty() ^ ifInsertElseUpdate) {
				return false;
			}
			Put put = createPut(hBaseRecordEntity, rowLock);
			List<Put> putList = Lists.newArrayList();
			putList.add(put);
			commitPut(hTableInterface, putList);
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			unlockRow(hTableInterface, rowLock);
			HTablePoolEngine.closeHTable(hTableInterface);
		}
		return true;
	}

	@Override
	public HBaseQueryResultEntity query(String tableName,
			HBaseQueryEntity hBaseQueryEntity) throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		logger.info(String.format("开始查询HBase表[%s]数据.", tableName));
		List<Result> resultList = getResultList(tableName, hBaseQueryEntity);
		HBaseQueryResultEntity hBaseQueryResultEntity = new HBaseQueryResultEntity();
		hBaseQueryResultEntity.setTotalCount(resultList.size());
		hBaseQueryResultEntity.setResultList(resultList);
		logger.info(String.format("结束查询HBase表[%s]数据.", tableName));
		return hBaseQueryResultEntity;
	}

	@Override
	public HBaseQueryResultEntity get(String tableName,
			HBaseQueryEntity hBaseQueryEntity) throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkNull(hBaseQueryEntity);
		CheckUtil.checkEmptyString(hBaseQueryEntity.getStartRowKey());
		logger.info(String.format("开始查询HBase表[%s]数据.", tableName));
		HBaseQueryResultEntity hBaseQueryResultEntity = new HBaseQueryResultEntity();
		hBaseQueryResultEntity.setTotalCount(1);
		List<Result> resultList = Lists.newArrayList();
		resultList.add(getResult(tableName, hBaseQueryEntity));
		hBaseQueryResultEntity.setResultList(resultList);
		logger.info(String.format("结束查询HBase表[%s]数据.", tableName));
		return hBaseQueryResultEntity;
	}

	@Override
	public void delete(String tableName, List<String> rowKeyList)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkEmptyCollection(rowKeyList);
		logger.info(String.format("开始删除HBase表[%s]数据.", tableName));
		HTableInterface hTableInterface = null;
		try {
			List<Delete> deleteList = Lists.newArrayList();
			hTableInterface = HTablePoolEngine.getHTable(tableName,
					hBaseInstanceName);
			for (String rowKey : rowKeyList) {
				Delete delete = new Delete(Bytes.toBytes(rowKey));
				deleteList.add(delete);
			}
			commitDelete(hTableInterface, deleteList);
			logger.info(String.format("删除HBase表[%s]数据%s.", tableName,
					rowKeyList));
			logger.info(String.format("结束删除HBase表[%s]数据.", tableName));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			HTablePoolEngine.closeHTable(hTableInterface);
		}
	}

	@Override
	public void delete(String tableName, String rowKey, String... familys)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkEmptyString(rowKey);
		CheckUtil.checkEmptyArray(familys);
		logger.info(String.format("开始删除HBase表[%s]数据.", tableName));
		HTableInterface hTableInterface = null;
		try {
			List<Delete> deleteList = Lists.newArrayList();
			hTableInterface = HTablePoolEngine.getHTable(tableName,
					hBaseInstanceName);
			Delete delete = new Delete(Bytes.toBytes(rowKey));
			for (String family : familys) {
				delete.deleteFamily(Bytes.toBytes(family));
				deleteList.add(delete);
			}
			commitDelete(hTableInterface, deleteList);
			logger.info(String.format("删除HBase表[%s]数据[%s %s].", tableName,
					rowKey, Arrays.asList(familys)));
			logger.info(String.format("结束删除HBase表[%s]数据.", tableName));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			HTablePoolEngine.closeHTable(hTableInterface);
		}
	}

	@Override
	public void delete(String tableName, String rowKey, String family,
			String... columns) throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkEmptyString(rowKey);
		CheckUtil.checkEmptyString(family);
		CheckUtil.checkEmptyArray(columns);
		logger.info(String.format("开始删除HBase表[%s]数据.", tableName));
		HTableInterface hTableInterface = null;
		try {
			List<Delete> deleteList = Lists.newArrayList();
			hTableInterface = HTablePoolEngine.getHTable(tableName,
					hBaseInstanceName);
			Delete delete = new Delete(Bytes.toBytes(rowKey));
			for (String column : columns) {
				delete.deleteColumn(Bytes.toBytes(family),
						Bytes.toBytes(column));
				deleteList.add(delete);
			}
			commitDelete(hTableInterface, deleteList);
			logger.info(String.format("删除HBase表[%s]数据[%s %s %s].", tableName,
					rowKey, family, Arrays.asList(columns)));
			logger.info(String.format("结束删除HBase表[%s]数据.", tableName));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			HTablePoolEngine.closeHTable(hTableInterface);
		}
	}

	private Put createPut(HBaseRecordEntity hBaseRecordEntity, RowLock rowLock) {
		CheckUtil.checkNull(hBaseRecordEntity);
		Put put = null;
		Map<String, Map<String, Map<Long, String>>> familyMap = hBaseRecordEntity
				.getFamilyMap();
		if (CheckUtil.isEmpty(familyMap)) {
			return null;
		}
		for (String family : familyMap.keySet()) {
			Map<String, Map<Long, String>> columnMap = familyMap.get(family);
			if (CheckUtil.isEmpty(columnMap)) {
				continue;
			}
			for (String column : columnMap.keySet()) {
				Map<Long, String> valueMap = columnMap.get(column);
				if (CheckUtil.isEmpty(valueMap)) {
					continue;
				}
				for (long timeStamp : valueMap.keySet()) {
					if (CheckUtil.isNull(put)) {
						if (CheckUtil.isNotNull(rowLock)) {
							put = new Put(Bytes.toBytes(hBaseRecordEntity
									.getRowKey()), rowLock);
						} else {
							put = new Put(Bytes.toBytes(hBaseRecordEntity
									.getRowKey()));
						}
					}
					// 当未指定明确的时间戳时则以服务器端的当前时间为时间戳
					if (timeStamp <= 0) {
						put.add(Bytes.toBytes(family), Bytes.toBytes(column),
								Bytes.toBytes(valueMap.get(timeStamp)));
					} else {
						put.add(Bytes.toBytes(family), Bytes.toBytes(column),
								timeStamp,
								Bytes.toBytes(valueMap.get(timeStamp)));
					}
				}
			}
		}
		return put;
	}

	private void submitPut(HTableInterface hTable, List<Put> putList)
			throws IOException {
		if (CheckUtil.isNotEmpty(putList)) {
			hTable.put(putList);
			putList.clear();
		}
	}

	private void commitPut(HTableInterface hTable, List<Put> putList)
			throws IOException {
		if (CheckUtil.isNotEmpty(putList)) {
			hTable.put(putList);
			hTable.flushCommits();
			putList.clear();
		}
	}

	private void addKeyAndFirstKeyOnlyFilter(Get get) {
		FilterList filterList = new FilterList();
		filterList.addFilter(new KeyOnlyFilter());
		filterList.addFilter(new FirstKeyOnlyFilter());
		get.setFilter(filterList);
	}

	private RowLock lockRow(HTableInterface hTableInterface, byte[] rowKey)
			throws HBaseDaoException {
		try {
			return hTableInterface.lockRow(rowKey);
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		}
	}

	private void unlockRow(HTableInterface hTableInterface, RowLock rowLock)
			throws HBaseDaoException {
		try {
			hTableInterface.unlockRow(rowLock);
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		}
	}

	private void close(Closeable closeable) throws HBaseDaoException {
		if (CheckUtil.isNotNull(closeable)) {
			try {
				closeable.close();
			} catch (IOException e) {
				throw new HBaseDaoException(e);
			}
		}
	}

	private void optimizeScan(Scan scan, HBaseQueryEntity hBaseQueryEntity)
			throws IOException {
		CheckUtil.checkNull(scan);
		CheckUtil.checkNull(hBaseQueryEntity);
		if (StringUtils.isNotEmpty(hBaseQueryEntity.getPrefixRowKey())) {
			hBaseQueryEntity.addFilter(new PrefixFilter(Bytes
					.toBytes(hBaseQueryEntity.getPrefixRowKey())));
		}
		if (CheckUtil.isNotNull(hBaseQueryEntity.getFilterList())
				&& CheckUtil.isNotEmpty(hBaseQueryEntity.getFilterList()
						.getFilters())) {
			scan.setFilter(hBaseQueryEntity.getFilterList());
		}
		if (StringUtils.isNotEmpty(hBaseQueryEntity.getStartRowKey())) {
			scan.setStartRow(Bytes.toBytes(hBaseQueryEntity.getStartRowKey()));
		}
		if (StringUtils.isNotEmpty(hBaseQueryEntity.getStopRowKey())) {
			scan.setStopRow(Bytes.toBytes(hBaseQueryEntity.getStopRowKey()));
		}
		if (hBaseQueryEntity.getMaxVersions() > 0) {
			scan.setMaxVersions(hBaseQueryEntity.getMaxVersions());
		} else {
			scan.setMaxVersions(HBaseConstant.HBASE_DEFAULT_VERSIONS);
		}
		scan.setCacheBlocks(hBaseQueryEntity.isCache());
		if (hBaseQueryEntity.getCacheSize() > 0) {
			/**
			 * hbase.client.scanner.caching配置项可以设置HBase
			 * scanner一次从服务端抓取的数据条数,默认情况下一次一条
			 * .通过将其设置成一个合理的值,可以减少scan过程中next()的时间开销
			 * ,代价是scanner需要通过客户端的内存来维持这些被cache的行记录.
			 * 有三个地方可以进行配置:1)在HBase的配置文件中进行配置;2)通过调用HTable.setScannerCaching(int
			 * scannerCaching)进行配置;3)通过调用Scan.setCaching(int
			 * caching)进行配置.三者的优先级越来越高.
			 */
			scan.setCaching(hBaseQueryEntity.getCacheSize());
		}
		if (hBaseQueryEntity.getMinStamp() > 0
				&& hBaseQueryEntity.getMaxStamp() > 0) {
			scan.setTimeRange(hBaseQueryEntity.getMinStamp(),
					hBaseQueryEntity.getMaxStamp());
		} else {
			if (hBaseQueryEntity.getTimeStamp() > 0) {
				scan.setTimeStamp(hBaseQueryEntity.getTimeStamp());
			}
		}
		if (CheckUtil.isNotEmpty(hBaseQueryEntity.getFamilyList())
				&& CheckUtil.isEmpty(hBaseQueryEntity.getColumnMap())) {
			for (String family : hBaseQueryEntity.getFamilyList()) {
				scan.addFamily(Bytes.toBytes(family));
			}
		} else if (CheckUtil.isEmpty(hBaseQueryEntity.getFamilyList())
				&& CheckUtil.isNotEmpty(hBaseQueryEntity.getColumnMap())) {
			filterColumn(scan, hBaseQueryEntity.getColumnMap());
		} else if (CheckUtil.isNotEmpty(hBaseQueryEntity.getFamilyList())
				&& CheckUtil.isNotEmpty(hBaseQueryEntity.getColumnMap())) {
			filterColumn(scan, hBaseQueryEntity.getColumnMap());
		}
	}

	private void optimizeGet(Get get, HBaseQueryEntity hBaseQueryEntity)
			throws IOException {
		CheckUtil.checkNull(get);
		CheckUtil.checkNull(hBaseQueryEntity);
		if (hBaseQueryEntity.getMaxVersions() > 0) {
			get.setMaxVersions(hBaseQueryEntity.getMaxVersions());
		} else {
			get.setMaxVersions(HBaseConstant.HBASE_DEFAULT_VERSIONS);
		}
		if (hBaseQueryEntity.getMinStamp() > 0
				&& hBaseQueryEntity.getMaxStamp() > 0) {
			get.setTimeRange(hBaseQueryEntity.getMinStamp(),
					hBaseQueryEntity.getMaxStamp());
		} else {
			if (hBaseQueryEntity.getTimeStamp() > 0) {
				get.setTimeStamp(hBaseQueryEntity.getTimeStamp());
			}
		}
		get.setCacheBlocks(hBaseQueryEntity.isCache());
		if (CheckUtil.isNotEmpty(hBaseQueryEntity.getFamilyList())
				&& CheckUtil.isEmpty(hBaseQueryEntity.getColumnMap())) {
			for (String family : hBaseQueryEntity.getFamilyList()) {
				get.addFamily(Bytes.toBytes(family));
			}
		} else if (CheckUtil.isEmpty(hBaseQueryEntity.getFamilyList())
				&& CheckUtil.isNotEmpty(hBaseQueryEntity.getColumnMap())) {
			filterColumn(get, hBaseQueryEntity.getColumnMap());
		} else if (CheckUtil.isNotEmpty(hBaseQueryEntity.getFamilyList())
				&& CheckUtil.isNotEmpty(hBaseQueryEntity.getColumnMap())) {
			filterColumn(get, hBaseQueryEntity.getColumnMap());
		}
	}

	private void filterColumn(Scan scan, Map<String, List<String>> columnMap) {
		for (String family : columnMap.keySet()) {
			List<String> columnList = columnMap.get(family);
			if (CheckUtil.isEmpty(columnList)) {
				continue;
			}
			for (String column : columnList) {
				scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
			}
		}
	}

	private void filterColumn(Get get, Map<String, List<String>> columnMap) {
		for (String family : columnMap.keySet()) {
			List<String> columnList = columnMap.get(family);
			if (CheckUtil.isEmpty(columnList)) {
				continue;
			}
			for (String column : columnList) {
				get.addColumn(Bytes.toBytes(family), Bytes.toBytes(column));
			}
		}
	}

	private void commitDelete(HTableInterface hTable, List<Delete> deleteList)
			throws IOException {
		if (CheckUtil.isNotEmpty(deleteList)) {
			hTable.delete(deleteList);
			hTable.flushCommits();
			deleteList.clear();
		}
	}

	@Override
	public <T> void putObject(T t) throws HBaseDaoException {
		CheckUtil.checkNull(t);
		List<T> tList = Lists.newArrayList();
		tList.add(t);
		putObject(tList);
	}

	@Override
	public <T> void putObject(HTableInterface hTableInterface, List<T> tList,
			boolean isWriteToWAL) throws HBaseDaoException {
		CheckUtil.checkNull(hTableInterface);
		CheckUtil.checkEmptyCollection(tList);
		HBaseTableEntityAnnotation hBaseTableEntityAnnotation = HBaseTableEntityAnnotationManage
				.getEntityAnnotation(tList.get(0).getClass());
		try {
			List<Put> putList = Lists.newArrayList();
			for (T t : tList) {
				Put put = HBaseEntityResolve.resolve(t, null,
						hBaseTableEntityAnnotation);
				if (CheckUtil.isNull(put)) {
					logger.error(String.format("错误的数据行%s.", t));
					continue;
				}
				put.setWriteToWAL(isWriteToWAL);
				putList.add(put);
			}
			submitPut(hTableInterface, putList);
			logger.info(String.format("插入%s条数据到HBase表[%s].", tList.size(),
					Bytes.toString(hTableInterface.getTableName())));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		}

	}

	@Override
	public <T> void putObject(List<T> tList, boolean isWriteToWAL)
			throws HBaseDaoException {
		CheckUtil.checkEmptyCollection(tList);
		HBaseTableEntityAnnotation hBaseTableEntityAnnotation = HBaseTableEntityAnnotationManage
				.getEntityAnnotation(tList.get(0).getClass());
		String tableName = hBaseTableEntityAnnotation.getHBaseTable()
				.tableName();
		CheckUtil.checkEmptyString(tableName);
		HTableInterface hTableInterface = null;
		try {
			List<Put> putList = Lists.newArrayList();
			hTableInterface = HTablePoolEngine.getHTable(tableName,
					hBaseInstanceName);
			for (T t : tList) {
				Put put = HBaseEntityResolve.resolve(t, null,
						hBaseTableEntityAnnotation);
				if (CheckUtil.isNull(put)) {
					logger.warn(String.format("错误的数据行%s.", t));
					continue;
				}
				put.setWriteToWAL(isWriteToWAL);
				putList.add(put);
			}
			commitPut(hTableInterface, putList);
			logger.info(String.format("插入%s条数据到HBase表[%s].", tList.size(),
					tableName));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			HTablePoolEngine.closeHTable(hTableInterface);
		}
	}

	@Override
	public <T> boolean insertObject(T t) throws HBaseDaoException {
		CheckUtil.checkNull(t);
		return mergeObject(t, true);
	}

	@Override
	public <T> boolean updateObject(T t) throws HBaseDaoException {
		CheckUtil.checkNull(t);
		return mergeObject(t, false);
	}

	@Override
	public <T> List<T> queryObject(Class<T> clazz,
			HBaseQueryEntity hBaseQueryEntity) throws HBaseDaoException {
		CheckUtil.checkNull(clazz);
		HBaseTableEntityAnnotation hBaseTableEntityAnnotation = HBaseTableEntityAnnotationManage
				.getEntityAnnotation(clazz);
		String tableName = hBaseTableEntityAnnotation.getHBaseTable()
				.tableName();
		CheckUtil.checkEmptyString(tableName);
		logger.info(String.format("开始查询HBase表[%s]数据.", tableName));
		List<T> tList = Lists.newArrayList();
		for (Result result : getResultList(tableName, hBaseQueryEntity)) {
			tList.add(HBaseEntityResolve.resolve(result, clazz,
					hBaseTableEntityAnnotation));
		}
		logger.info(String.format("结束查询HBase表[%s]数据.", tableName));
		return tList;
	}

	@Override
	public <T> T getObject(Class<T> clazz, HBaseQueryEntity hBaseQueryEntity)
			throws HBaseDaoException {
		CheckUtil.checkNull(clazz);
		HBaseTableEntityAnnotation hBaseTableEntityAnnotation = HBaseTableEntityAnnotationManage
				.getEntityAnnotation(clazz);
		String tableName = hBaseTableEntityAnnotation.getHBaseTable()
				.tableName();
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkNull(hBaseQueryEntity);
		CheckUtil.checkEmptyString(hBaseQueryEntity.getStartRowKey());
		logger.info(String.format("开始查询HBase表[%s]数据.", tableName));
		T t = HBaseEntityResolve.resolve(
				getResult(tableName, hBaseQueryEntity), clazz,
				hBaseTableEntityAnnotation);
		logger.info(String.format("结束查询HBase表[%s]数据.", tableName));
		return t;
	}

	@Override
	protected <T> boolean mergeObject(T t, boolean ifInsertElseUpdate)
			throws HBaseDaoException {
		CheckUtil.checkNull(t);
		HBaseTableEntityAnnotation hBaseTableEntityAnnotation = HBaseTableEntityAnnotationManage
				.getEntityAnnotation(t.getClass());
		String tableName = hBaseTableEntityAnnotation.getHBaseTable()
				.tableName();
		CheckUtil.checkEmptyString(tableName);
		HTableInterface hTableInterface = null;
		RowLock rowLock = null;
		try {
			hTableInterface = HTablePoolEngine.getHTable(tableName,
					hBaseInstanceName);
			byte[] rowKey = hBaseTableEntityAnnotation.getRowKey(t);
			rowLock = lockRow(hTableInterface, rowKey);
			Get get = new Get(rowKey, rowLock);
			addKeyAndFirstKeyOnlyFilter(get);
			Result result = hTableInterface.get(get);
			if (result.isEmpty() ^ ifInsertElseUpdate) {
				return false;
			}
			Put put = HBaseEntityResolve.resolve(t, rowLock,
					hBaseTableEntityAnnotation);
			List<Put> putList = Lists.newArrayList();
			putList.add(put);
			commitPut(hTableInterface, putList);
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			unlockRow(hTableInterface, rowLock);
			HTablePoolEngine.closeHTable(hTableInterface);
		}
		return true;
	}

	private List<Result> getResultList(String tableName,
			HBaseQueryEntity hBaseQueryEntity) throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		HTableInterface hTableInterface = null;
		List<Result> resultList = null;
		ResultScanner resultScanner = null;
		int maxRows = Integer.MAX_VALUE;
		int count = 0;
		try {
			hTableInterface = HTablePoolEngine.getHTable(tableName,
					hBaseInstanceName);
			Scan scan = new Scan();
			if (CheckUtil.isNotNull(hBaseQueryEntity)) {
				logger.info(String.format("查询条件%s.", hBaseQueryEntity));
				if (hBaseQueryEntity.getMaxRows() > 0) {
					maxRows = hBaseQueryEntity.getMaxRows();
				}
				optimizeScan(scan, hBaseQueryEntity);
			}
			resultScanner = hTableInterface.getScanner(scan);
			resultList = Lists.newArrayList();
			int batch = 0;
			int cacheSize = 0;
			if (scan.getCaching() > 0) {
				cacheSize = scan.getCaching();
			} else {
				cacheSize = HBaseConstant.HBASE_SCAN_CACHE_SIZE;
			}
			if (maxRows < cacheSize) {
				batch = maxRows;
			} else {
				batch = cacheSize;
			}
			L: for (Result[] results = resultScanner.next(batch); CheckUtil
					.isNotEmpty(results); results = resultScanner.next(batch)) {
				for (Result result : results) {
					resultList.add(result);
					++count;
					if (count >= maxRows) {
						break L;
					}
				}
			}
			logger.info(String.format("查询到%s条数据.", count));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			close(resultScanner);
			HTablePoolEngine.closeHTable(hTableInterface);
		}
		return resultList;
	}

	private Result getResult(String tableName, HBaseQueryEntity hBaseQueryEntity)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkNull(hBaseQueryEntity);
		CheckUtil.checkEmptyString(hBaseQueryEntity.getStartRowKey());
		HTableInterface hTableInterface = null;
		Result result = null;
		try {
			hTableInterface = HTablePoolEngine.getHTable(tableName,
					hBaseInstanceName);
			Get get = new Get(Bytes.toBytes(hBaseQueryEntity.getStartRowKey()));
			if (CheckUtil.isNotNull(hBaseQueryEntity)) {
				logger.info(String.format("查询条件%s.", hBaseQueryEntity));
				optimizeGet(get, hBaseQueryEntity);
			}
			result = hTableInterface.get(get);
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			HTablePoolEngine.closeHTable(hTableInterface);
		}
		return result;
	}

}